package com.fwmagic.dynamic_rule.functions;

import com.alibaba.fastjson.JSON;
import com.fwmagic.dynamic_rule.bean.*;
import com.fwmagic.dynamic_rule.service.QueryRouterServiceV4;
import com.fwmagic.dynamic_rule.utils.RuleSimulator;
import com.fwmagic.dynamic_rule.utils.RuleStateDescUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.state.*;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.co.KeyedBroadcastProcessFunction;
import org.apache.flink.util.Collector;
import org.kie.api.io.ResourceType;
import org.kie.api.runtime.KieSession;
import org.kie.internal.utils.KieHelper;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

/**
 * 规则处理核心逻辑V5(Flink State && ClickHouse & Cache(redis))
 * 加入Drools动态规则引擎
 */
@Slf4j
public class RuleProcessFunctionV5 extends KeyedBroadcastProcessFunction<String, LogBean, String, ResultBean> {

    private ListState<LogBean> evnetsState;

    @Override
    public void open(Configuration parameters) throws Exception {
        //准备一个底层明细事件的State
        //设置State中只存储最近两个小时的数据
        ListStateDescriptor<LogBean> eventsStateDesc = RuleStateDescUtils.eventsStateDesc;
        StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.hours(2)).updateTtlOnCreateAndWrite().build();
        eventsStateDesc.enableTimeToLive(stateTtlConfig);
        evnetsState = getRuntimeContext().getListState(eventsStateDesc);
    }

    /**
     * 核心计算方法
     *
     * @param logBean
     * @param readOnlyContext
     * @param collector
     * @throws Exception
     */
    @Override
    public void processElement(LogBean logBean, ReadOnlyContext readOnlyContext, Collector<ResultBean> collector) throws Exception {
        log.info("收到一条事件数据:{}", JSON.toJSONString(logBean));
        //将收到的事件放入历史明细state存储中
        evnetsState.add(logBean);

        //获取mapState
        ReadOnlyBroadcastState<String, RuleStateBean> mapState = readOnlyContext.getBroadcastState(RuleStateDescUtils.ruleKieStateDesc);

        Iterable<Map.Entry<String, RuleStateBean>> entries = mapState.immutableEntries();
        for (Map.Entry<String, RuleStateBean> entry : entries) {
            String ruleName = entry.getKey();
            RuleStateBean ruleStateBean = entry.getValue();
            //取出sql
            String cntSqls = ruleStateBean.getCntSqls();
            String seqSqls = ruleStateBean.getSeqSqls();

            //取出kieSession
            KieSession kieSession = ruleStateBean.getKieSession();

            //构造RuleParam对象
            RuleParam ruleParam = new RuleParam();
            ruleParam.setRuleName(ruleName);

            //放入cntSql
            String[] sqls = cntSqls.split(";");
            ArrayList<RuleAtomicParam> countParams = new ArrayList<>();
            for (String sql : sqls) {
                RuleAtomicParam ruleAtomicParam = new RuleAtomicParam();
                ruleAtomicParam.setActionCountQuerySql(sql);
                countParams.add(ruleAtomicParam);
            }
            //将封装好的countParams放入ruleParam
            ruleParam.setUserActionCountParam(countParams);

            //放入seqSql
            ruleParam.setActionSequenceQuerySql(seqSqls);

            //构建一个QueryRouter
            QueryRouterServiceV4 queryRouter = new QueryRouterServiceV4(evnetsState);

            DroolFact fact = new DroolFact(logBean, ruleParam, queryRouter, false);

            //将fat插入kieSession中
            kieSession.insert(fact);

            //启动规则
            kieSession.fireAllRules();

            //判断计算后的结果是否匹配
            if (fact.isMatch()) {
                collector.collect(new ResultBean(ruleName, logBean.getDeviceId(), logBean.getTimeStamp()));
            }
        }
    }


    /**
     * 处理输入的规则操作信息，是canal从mysql中监听到并写入kafka的json数据
     *
     * @param json
     * @param context
     * @param collector
     * @throws Exception
     */
    @Override
    public void processBroadcastElement(String json, Context context, Collector<ResultBean> collector) throws Exception {
        if (StringUtils.isBlank(json)) {
            return;
        }

        //解析json成对象
        RuleCanalBean ruleCanalBean = JSON.parseObject(json, RuleCanalBean.class);

        //只接收INSERT操作的数据，其他事件过滤掉
        if (!"INSERT".equals(ruleCanalBean.getType())) return;
        log.info("收到一条规则数据:{}", json);

        BroadcastState<String, RuleStateBean> mapState = context.getBroadcastState(RuleStateDescUtils.ruleKieStateDesc);


        //从canal信息中，取到规则表的行数据List(id,规则名称，规则代码，cntsql,seqsql)
        List<RuleTableRecord> data = ruleCanalBean.getData();
        if (CollectionUtils.isEmpty(data)) return;
        //从行数据List中取到第一行(其实就只有一行)(id,规则名称，规则代码，cntsql,seqsql)
        RuleTableRecord tableRecord = data.get(0);
        String ruleName = tableRecord.getRule_name();

        //如果status=1，则做新增规则的操作
        if (tableRecord.getRule_status() == 1) {
            RuleStateBean ruleStateBean = new RuleStateBean();
            ruleStateBean.setRuleName(ruleName);

            //插入cntsql
            ruleStateBean.setCntSqls(tableRecord.getCnt_sqls());

            //插入seqsql
            ruleStateBean.setSeqSqls(tableRecord.getSeq_sqls());

            //插入kieSession
            KieSession kieSession = new KieHelper()
                    .addContent(tableRecord.getRule_code(), ResourceType.DRL)
                    .build()
                    .newKieSession();

            ruleStateBean.setKieSession(kieSession);

            //把ruleStateBean放入State
            mapState.put(ruleName, ruleStateBean);
        } else {//否则只有删除这种情况，则做删除操作
            mapState.remove(ruleName);
        }
    }
}
